//创建过程中传入的Observer final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) { this.observer = observer; }
//OnNext方法 @Override publicvoidonNext(T t){ //非空检查,onNext在2.0之后不允许传入null值作为参数 if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } //这个对应上我们的上一篇博客,一次性水管,如果isDisposed为true,则发射器发出的事件,将不会被观察者执行 if (!isDisposed()) { observer.onNext(t); } }
@Override publicbooleantryOnError(Throwable t){ //也是不允许传入null if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } //如果isDisposed为false,执行观察者的onError方法,然后执行dispose()操作,也就是观察者不处理后面发射器发送的事件了。估计onComplete()方法中也会有类似的操作流程 if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } returntrue; } //只有当isDisposed为true的时候回返回false,也就是上一个方法回执行RxJavaPlugins.onError(t);操作 returnfalse; }
publicfinalvoidsubscribe(Observer<? super T> observer){ //observer非空检查 ObjectHelper.requireNonNull(observer, "observer is null"); try { //关联observable和observer observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); //这个方法在Observable中是个抽象方法,但是结合上面Observerable的create过程,可以知道这里实际上调用的是ObservableCreate的subscribeActual方法,也就是上面我们分析的过程,没毛病 subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); // can't call onError because no way to know if a Disposable has been set or not // can't call onSubscribe because the call might have set a Subscription already RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
/** The name of the system property for setting the thread priority for this Scheduler. */ privatestaticfinal String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static { int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY, Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority); }
public Disposable scheduleDirect(@NonNull Runnable run){ return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); }
1 2 3 4 5 6 7 8 9 10 11 12 13
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit){ //创建工作线程,以NewThreadScheduler为例,是创建NewThreadWorker final Worker w = createWorker(); // final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit){ if (disposed) { return EmptyDisposable.INSTANCE; } return scheduleActual(action, delayTime, unit, null); }